Android14 显示系统剖析7 ———— mConsumerListener 回调执行过程分析

7/1/2024

# 回顾

上一节,我们聊到 App 把数据写入 buffer 后,调用 BufferQueueProducer::queueBuffer 将 buffer 插入队列,接着会调用一个回调 frameAvailableListener->onFrameAvailable(item) 来通知 Consumer,可以来取 buffer 了。

本节要解决的问题就是这个回调对象 frameAvailableListener 来自哪里,以及回调方法通知 Consumer 消费 buffer 的过程。

# frameAvailableListener 来自哪里

frameAvailableListener 来自哪里?

frameAvailableListener 是在 queueBuffer 函数中赋值的:

status_t BufferQueueProducer::queueBuffer(int slot,
        const QueueBufferInput &input, QueueBufferOutput *output) {
        
        // ......

        if (mCore->mQueue.empty()) {
            // When the queue is empty, we can ignore mDequeueBufferCannotBlock
            // and simply queue this buffer
            // // 如果mQueue队列为空,则直接push进入这个mQueue,不用考虑阻塞
            mCore->mQueue.push_back(item);
            // 拿到回调对象
            frameAvailableListener = mCore->mConsumerListener;
        } else {
            // When the queue is not empty, we need to look at the last buffer
            // in the queue to see if we need to replace it
            // 判断最后一个 BufferItem 是否可以丢弃 
            const BufferItem& last = mCore->mQueue.itemAt(
                    mCore->mQueue.size() - 1);
            if (last.mIsDroppable) {

                if (!last.mIsStale) {
                    mSlots[last.mSlot].mBufferState.freeQueued();

                    // After leaving shared buffer mode, the shared buffer will
                    // still be around. Mark it as no longer shared if this
                    // operation causes it to be free.
                    if (!mCore->mSharedBufferMode &&
                            mSlots[last.mSlot].mBufferState.isFree()) {
                        mSlots[last.mSlot].mBufferState.mShared = false;
                    }
                    // Don't put the shared buffer on the free list.
                    if (!mSlots[last.mSlot].mBufferState.isShared()) {
                        mCore->mActiveBuffers.erase(last.mSlot);
                        mCore->mFreeBuffers.push_back(last.mSlot);
                        output->bufferReplaced = true;
                    }
                }

                // Make sure to merge the damage rect from the frame we're about
                // to drop into the new frame's damage rect.
                if (last.mSurfaceDamage.bounds() == Rect::INVALID_RECT ||
                    item.mSurfaceDamage.bounds() == Rect::INVALID_RECT) {
                    item.mSurfaceDamage = Region::INVALID_REGION;
                } else {
                    item.mSurfaceDamage |= last.mSurfaceDamage;
                }
                // 用当前 BufferItem,替换了队列最后一个 BufferItem
                // Overwrite the droppable buffer with the incoming one
                mCore->mQueue.editItemAt(mCore->mQueue.size() - 1) = item;
                // 拿到回调对象
                frameReplacedListener = mCore->mConsumerListener;
            } else {
                // 直接push进入这个mQueue
                mCore->mQueue.push_back(item);
                // 拿到回调对象
                frameAvailableListener = mCore->mConsumerListener;
            }
        }


        // ......
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

不论哪种 if else 情况,frameAvailableListener 都会被赋值为 mCore->mConsumerListener

接下来的问题就转化为 mCore->mConsumerListener 来自哪里?

通过搜索可以发现 BufferQueueConsumer::connect 中会去给 mCore->mConsumerListener 赋值:

status_t BufferQueueConsumer::connect(
        const sp<IConsumerListener>& consumerListener, bool controlledByApp) {
    ....
    mCore->mConsumerListener = consumerListener;
    mCore->mConsumerControlledByApp = controlledByApp;
}
1
2
3
4
5
6

接下来的问题就转化为 BufferQueueConsumer::connect 什么时候被调用 ?

其实在 BLASTBufferQueue 初始化 (opens new window)章节中,我们就知道了在 new BLASTBufferItemConsumer 的过程中就会调用到 BufferQueueConsumer::connect 了,这里我们再回顾一下 BLASTBufferItemConsumer 的构造函数:

// 参数 mConsumer 类型是 BufferQueueConsumer
mBufferItemConsumer = new BLASTBufferItemConsumer(mConsumer,
                                                      GraphicBuffer::USAGE_HW_COMPOSER |
                                                              GraphicBuffer::USAGE_HW_TEXTURE,
                                                      1, false, this);

class BLASTBufferItemConsumer : public BufferItemConsumer {
public:
    BLASTBufferItemConsumer(const sp<IGraphicBufferConsumer>& consumer, uint64_t consumerUsage,
                            int bufferCount, bool controlledByApp, wp<BLASTBufferQueue> bbq)
          : BufferItemConsumer(consumer, consumerUsage, bufferCount, controlledByApp),
            mBLASTBufferQueue(std::move(bbq)),
            mCurrentlyConnected(false),
            mPreviouslyConnected(false) {}

   // ......
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

这里会调用到父类 BufferItemConsumer 的构造函数:

BufferItemConsumer::BufferItemConsumer(
        const sp<IGraphicBufferConsumer>& consumer, uint64_t consumerUsage,
        int bufferCount, bool controlledByApp) :
    ConsumerBase(consumer, controlledByApp)
{
    status_t err = mConsumer->setConsumerUsageBits(consumerUsage);
    LOG_ALWAYS_FATAL_IF(err != OK,
            "Failed to set consumer usage bits to %#" PRIx64, consumerUsage);
    if (bufferCount != DEFAULT_MAX_BUFFERS) {
        err = mConsumer->setMaxAcquiredBufferCount(bufferCount);
        LOG_ALWAYS_FATAL_IF(err != OK,
                "Failed to set max acquired buffer count to %d", bufferCount);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

接着调用父类 ConsumerBase 的构造函数

// bufferQueue 的类型是 BufferQueueConsumer
ConsumerBase::ConsumerBase(const sp<IGraphicBufferConsumer>& bufferQueue, bool controlledByApp) :
        mAbandoned(false),
        mConsumer(bufferQueue),
        mPrevFinalReleaseFence(Fence::NO_FENCE) {
    // Choose a name using the PID and a process-unique ID.
    mName = String8::format("unnamed-%d-%d", getpid(), createProcessUniqueId());

    // Note that we can't create an sp<...>(this) in a ctor that will not keep a
    // reference once the ctor ends, as that would cause the refcount of 'this'
    // dropping to 0 at the end of the ctor.  Since all we need is a wp<...>
    // that's what we create.
    // listener 就是 BLASTBufferItemConsumer 自己
    wp<ConsumerListener> listener = static_cast<ConsumerListener*>(this);
    // 再包一层代理?
    sp<IConsumerListener> proxy = new BufferQueue::ProxyConsumerListener(listener);
    
    // 重点关注
    status_t err = mConsumer->consumerConnect(proxy, controlledByApp);
    if (err != NO_ERROR) {
        CB_LOGE("ConsumerBase: error connecting to BufferQueue: %s (%d)",
                strerror(-err), err);
    } else {
        mConsumer->setConsumerName(mName);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

这里会把 BLASTBufferItemConsumer 对象包装为一个 IConsumerListener 指针,接着传递给 consumerConnect 函数,consumerConnect 函数的实现如下:

    virtual status_t consumerConnect(const sp<IConsumerListener>& consumer,
            bool controlledByApp) {
        return connect(consumer, controlledByApp);
    }
1
2
3
4

接着调用 connect 函数:

tatus_t BufferQueueConsumer::connect(
        const sp<IConsumerListener>& consumerListener, bool controlledByApp) {
    ATRACE_CALL();

    if (consumerListener == nullptr) {
        BQ_LOGE("connect: consumerListener may not be NULL");
        return BAD_VALUE;
    }

    BQ_LOGV("connect: controlledByApp=%s",
            controlledByApp ? "true" : "false");

    std::lock_guard<std::mutex> lock(mCore->mMutex);

    if (mCore->mIsAbandoned) {
        BQ_LOGE("connect: BufferQueue has been abandoned");
        return NO_INIT;
    }
    
    // 这里的 mConsumerListener 实际就是 BLASTBufferItemConsumer 的包装
    mCore->mConsumerListener = consumerListener;
    mCore->mConsumerControlledByApp = controlledByApp;

    return NO_ERROR;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

mCore->mConsumerListener 的值其实就是之前 new 的 BLASTBufferItemConsumer 对象再包装一层 BufferQueue::ProxyConsumerListener 代理对象。

这里就搞清楚了,frameAvailableListener 实际就是 BLASTBufferItemConsumer 对象基础上包装的 BufferQueue::ProxyConsumerListener 代理对象。

# Consumer 消费 buffer 的过程分析

从上面的分析可以知道,调用 frameAvailableListener->onFrameAvailable(item) 回调时,实际会走到 BLASTBufferItemConsumer 的 onFrameAvailable 函数中,该函数定义在 BLASTBufferItemConsumer 的父类 ConsumerBase 中。

void ConsumerBase::onFrameAvailable(const BufferItem& item) {
    CB_LOGV("onFrameAvailable");

    sp<FrameAvailableListener> listener;
    { // scope for the lock
        Mutex::Autolock lock(mFrameAvailableMutex);
        listener = mFrameAvailableListener.promote();
    }

    if (listener != nullptr) {
        CB_LOGV("actually calling onFrameAvailable");
        listener->onFrameAvailable(item);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

这里又会接着调用 ConsumerBase 的成员 mFrameAvailableListener 的 onFrameAvailable 函数。

那 ConsumerBase 的成员 mFrameAvailableListener 又是什么时候赋值的呢?

在 BLASTBufferQueue 的构造函数中:

BLASTBufferQueue::BLASTBufferQueue(const std::string& name, bool updateDestinationFrame)
      : mSurfaceControl(nullptr),
        mSize(1, 1),
        mRequestedSize(mSize),
        mFormat(PIXEL_FORMAT_RGBA_8888),
        mTransactionReadyCallback(nullptr),
        mSyncTransaction(nullptr),
        mUpdateDestinationFrame(updateDestinationFrame) {

    createBufferQueue(&mProducer, &mConsumer);
    // since the adapter is in the client process, set dequeue timeout
    // explicitly so that dequeueBuffer will block
    mProducer->setDequeueTimeout(std::numeric_limits<int64_t>::max());

    // safe default, most producers are expected to override this
    mProducer->setMaxDequeuedBufferCount(2);
    mBufferItemConsumer = new BLASTBufferItemConsumer(mConsumer,
                                                      GraphicBuffer::USAGE_HW_COMPOSER |
                                                              GraphicBuffer::USAGE_HW_TEXTURE,
                                                      1, false, this);
    static std::atomic<uint32_t> nextId = 0;
    mProducerId = nextId++;
    mName = name + "#" + std::to_string(mProducerId);
    auto consumerName = mName + "(BLAST Consumer)" + std::to_string(mProducerId);
    mQueuedBufferTrace = "QueuedBuffer - " + mName + "BLAST#" + std::to_string(mProducerId);
    mBufferItemConsumer->setName(String8(consumerName.c_str()));
    // 重点关注
    mBufferItemConsumer->setFrameAvailableListener(this);
    //......
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

mBufferItemConsumer 初始化完成后,就会调用 mBufferItemConsumer->setFrameAvailableListener(this); 函数,该函数会将 ConsumerBase 的成员 mFrameAvailableListener 设置为当前的 BLASTBufferQueue 对象。

也就是说,上面的回调会调用到 BLASTBufferQueue 的 onFrameAvailable 函数:

void BLASTBufferQueue::onFrameAvailable(const BufferItem& item) {
    std::function<void(SurfaceComposerClient::Transaction*)> prevCallback = nullptr;
    SurfaceComposerClient::Transaction* prevTransaction = nullptr;

    {
        UNIQUE_LOCK_WITH_ASSERTION(mMutex);
        BBQ_TRACE();

        // false
        bool waitForTransactionCallback = !mSyncedFrameNumbers.empty();

        // false
        const bool syncTransactionSet = mTransactionReadyCallback != nullptr;
        BQA_LOGV("onFrameAvailable-start syncTransactionSet=%s", boolToString(syncTransactionSet));

        if (syncTransactionSet) { // false
           // ......
        }

        // add to shadow queue
        mNumFrameAvailable++;
        if (waitForTransactionCallback && mNumFrameAvailable >= 2) { // 不进入
            acquireAndReleaseBuffer();
        }
        ATRACE_INT(mQueuedBufferTrace.c_str(),
                   mNumFrameAvailable + mNumAcquired - mPendingRelease.size());

        BQA_LOGV("onFrameAvailable framenumber=%" PRIu64 " syncTransactionSet=%s",
                 item.mFrameNumber, boolToString(syncTransactionSet));

        if (syncTransactionSet) {
            // Add to mSyncedFrameNumbers before waiting in case any buffers are released
            // while waiting for a free buffer. The release and commit callback will try to
            // acquire buffers if there are any available, but we don't want it to acquire
            // in the case where a sync transaction wants the buffer.
            mSyncedFrameNumbers.emplace(item.mFrameNumber);
            // If there's no available buffer and we're in a sync transaction, we need to wait
            // instead of returning since we guarantee a buffer will be acquired for the sync.
            while (acquireNextBufferLocked(mSyncTransaction) == BufferQueue::NO_BUFFER_AVAILABLE) {
                BQA_LOGD("waiting for available buffer");
                mCallbackCV.wait(_lock);
            }

            // Only need a commit callback when syncing to ensure the buffer that's synced has been
            // sent to SF
            incStrong((void*)transactionCommittedCallbackThunk)
            mSyncTransaction->addTransactionCommittedCallback(transactionCommittedCallbackThunk,
                                                              static_cast<void*>(this));
            if (mAcquireSingleBuffer) {
                prevCallback = mTransactionReadyCallback;
                prevTransaction = mSyncTransaction;
                mTransactionReadyCallback = nullptr;
                mSyncTransaction = nullptr;
            }
        } else if (!waitForTransactionCallback) { // 走这里
            acquireNextBufferLocked(std::nullopt);
        }
    }
    if (prevCallback) {
        prevCallback(prevTransaction);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

onFrameAvailable 中调用到 acquireNextBufferLocked 函数, acquireNextBufferLocked 函数中会调用 acquireBuffer 从队列中取出 buffer,构建好事务对象,接着将显示图形的事务提交给 SurfaceFlinger 进程,后续在通过回调调用 releaseBuffer 函数以及通知 Producer 继续后续工作。这部分内容我们就在下一节再来讲解了。

https://app.yinxiang.com/shard/s42/nl/53744825/3a93a288-4b0f-421e-9166-2d217ed57d10